package com.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.bean.VisitorStats;
import com.common.GlobalConfig;
import com.utils.ClickHouseUtil;
import com.utils.DateTimeUtil;
import com.utils.MyKafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import scala.Tuple4;

import java.time.Duration;
import java.util.Date;

/**
 * @Description: TODO QQ1667847363
 * @author: xiao kun tai
 * @date:2022/1/8 22:01
 */

//TODO: 数据流 web/app -> Nginx -> Springboot  -> Kafka(ods)  ->  FlinkApp  -> Kafka(dwd)
// -> FlinkApp -> Kafka(DWM) -> FlinkApp -> ClickHouse
//TODO: 程序 mockLog -> Nginx -> Logger.sh -> Kafka(ZK) -> BaseLogApp -> Kafka(ZK)
// -> uv/pv -> Kafka(ZK) -> VisitorStatsApp -> ClickHouse
public class VisitorStatsApp {
    public static void main(String[] args) throws Exception {
        //TODO:1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1); //生产环境，与Kafka分区数保持一致

        //TODO:Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点续传,需要从 Checkpoint 或者 Savepoint 启动程序
        /*//开启CK并指定状态后端为FS menory fs rocksdb
        env.setStateBackend(new FsStateBackend("hdfs://192.168.88.109:9820/gmall-flink/ck"));
        //开启 Checkpoint,每隔 5 秒钟做一次 CK
        env.enableCheckpointing(5000L);
        //指定 CK 的一致性语义
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(10000L);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);
        //设置任务关闭的时候保留最后一次 CK 数据
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //指定从 CK 自动重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,2000L));
        //设置访问 HDFS 的用户名
        System.setProperty("HADOOP_USER_NAME", "root");*/


        //TODO:2.读取Kafka数据 创建流
        String groupId = "visitor_stats_app" + GlobalConfig.NUMBER;
        String pageViewSourceTopic = "dwd_page_log";
        String uniqueVisitSourceTopic = "dwm_unique_visit";
        String userJumpDetailSourceTopic = "dwm_user_jump_detail";

        DataStreamSource<String> uvDS = env.addSource(MyKafkaUtil.getKafkaConsumer(uniqueVisitSourceTopic, groupId));
        DataStreamSource<String> ujDS = env.addSource(MyKafkaUtil.getKafkaConsumer(userJumpDetailSourceTopic, groupId));
        DataStreamSource<String> pvDS = env.addSource(MyKafkaUtil.getKafkaConsumer(pageViewSourceTopic, groupId));


        //TODO:3.将每个流处理成相同的数据类型

        //3.1处理UV数据
        SingleOutputStreamOperator<VisitorStats> visitorStatsWithUvDS = uvDS.map(line -> {
            JSONObject jsonObject = JSON.parseObject(line);

            //提取公共字段
            JSONObject common = jsonObject.getJSONObject("common");

            return new VisitorStats("", "",
                    common.getString("vc"),
                    common.getString("ch"),
                    common.getString("ar"),
                    common.getString("is_new"),
                    1L, 0L, 0L, 0L, 0L,
                    jsonObject.getLong("ts"));
        });

        //3.2处理UJ数据
        SingleOutputStreamOperator<VisitorStats> visitorStatsWithUjDS = ujDS.map(line -> {
            JSONObject jsonObject = JSON.parseObject(line);

            //提取公共字段
            JSONObject common = jsonObject.getJSONObject("common");

            return new VisitorStats("", "",
                    common.getString("vc"),
                    common.getString("ch"),
                    common.getString("ar"),
                    common.getString("is_new"),
                    0L, 0L, 0L, 1L, 0L,
                    jsonObject.getLong("ts"));
        });

        //3.3处理PV数据
        SingleOutputStreamOperator<VisitorStats> visitorStatsWithPvDS = pvDS.map(line -> {
            JSONObject jsonObject = JSON.parseObject(line);

            //提取公共字段
            JSONObject common = jsonObject.getJSONObject("common");
            //获取页面信息
            JSONObject page = jsonObject.getJSONObject("page");

            String last_page_id = page.getString("last_page_id");

            long sv = 0L;

            if (last_page_id == null || last_page_id.length() == 0) {
                sv = 1L;
            }

            return new VisitorStats("", "",
                    common.getString("vc"),
                    common.getString("ch"),
                    common.getString("ar"),
                    common.getString("is_new"),
                    0L, 1L, sv, 0L,
                    page.getLong("during_time"),
                    jsonObject.getLong("ts"));
        });

        //TODO:4.Union连接流
        DataStream<VisitorStats> unionDS = visitorStatsWithUvDS.union(
                visitorStatsWithUjDS,
                visitorStatsWithPvDS);


        //TODO:5.提取时间戳  生成WaterMark (乱序)
        SingleOutputStreamOperator<VisitorStats> visitorStatsWithWMDS =
                unionDS.assignTimestampsAndWatermarks(WatermarkStrategy
                        .<VisitorStats>forBoundedOutOfOrderness(Duration.ofSeconds(11))
                        .withTimestampAssigner(new SerializableTimestampAssigner<VisitorStats>() {
                            @Override
                            public long extractTimestamp(VisitorStats element, long recordTimestamp) {
                                return element.getTs();
                            }
                        }));


        //TODO:6.按照维度信息分组
        KeyedStream<VisitorStats, Tuple4<String, String, String, String>> keyedStream =
                visitorStatsWithWMDS
                        .keyBy(new KeySelector<VisitorStats, Tuple4<String, String, String, String>>() {
                            @Override
                            public Tuple4<String, String, String, String> getKey(VisitorStats value) throws Exception {
                                return new Tuple4<String, String, String, String>(value.getAr(),
                                        value.getCh(),
                                        value.getIs_new(),
                                        value.getVc());
                            }
                        });


        //TODO:7.开窗聚合   10s的滚动窗口
        WindowedStream<VisitorStats, Tuple4<String, String, String, String>, TimeWindow> windowedStream =
                keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(10)));

        //ReduceFunction(增量聚合,效率高) ，WindowFunction（有窗口信息）
        SingleOutputStreamOperator<VisitorStats> result = windowedStream.reduce(new ReduceFunction<VisitorStats>() {
            @Override
            public VisitorStats reduce(VisitorStats value1, VisitorStats value2) throws Exception {
                /*return new VisitorStats(value1.getStt(),
                        value1.getEdt(),
                        value1.getVc(),
                        value1.getCh(),
                        value1.getAr(),
                        value1.getIs_new(),
                        value1.getUv_ct()+value2.getUv_ct(),
                        value1.getPv_ct()+value2.getPv_ct(),
                        value1.getSv_ct()+value2.getSv_ct(),
                        value1.getUj_ct()+value2.getUj_ct(),
                        value1.getDur_sum()+value2.getDur_sum(),
                        value1.getTs());*/
                //TODO:只适用于滚动窗口
                value1.setUv_ct(value1.getUv_ct() + value2.getUv_ct());
                value1.setPv_ct(value1.getPv_ct() + value2.getPv_ct());
                value1.setSv_ct(value1.getSv_ct() + value2.getSv_ct());
                value1.setUj_ct(value1.getUj_ct() + value2.getUj_ct());
                value1.setDur_sum(value1.getDur_sum() + value2.getDur_sum());
                return value1;

            }
        }, new WindowFunction<VisitorStats, VisitorStats, Tuple4<String, String, String, String>, TimeWindow>() {
            @Override
            public void apply(Tuple4<String, String, String, String> stringStringStringStringTuple4, TimeWindow timeWindow, Iterable<VisitorStats> iterable, Collector<VisitorStats> collector) throws Exception {
                long start = timeWindow.getStart();
                long end = timeWindow.getEnd();

                VisitorStats visitorStats = iterable.iterator().next();

                //补充窗口信息
                visitorStats.setStt(DateTimeUtil.toYMDhms(new Date(start)));
                visitorStats.setEdt(DateTimeUtil.toYMDhms(new Date(end)));

                collector.collect(visitorStats);

            }
        });


        //TODO:8.将数据写入ClickHouse
        System.out.println("任务开始....................");
        result.print("result>>>>>>>>>>>>");
        result.addSink(ClickHouseUtil.getSink("insert into visitor_stats_2021 values(?,?,?,?,?,?,?,?,?,?,?,?)"));


        //TODO:9.任务的启动
        env.execute("VisitorStatsApp");
    }
}
